package com.go.utils;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

/**
 * ClassName: FlinkUtils
 * Description:
 * Date: 2022/1/6
 * @author: Cason
 */
public class FlinkUtils {

    public static final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

    public static <T> DataStream<T> createKafkaStream(String path, Class<? extends KafkaDeserializationSchema<T>> clazz) throws IOException, InstantiationException, IllegalAccessException {

        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(path);
        String[] topics = parameterTool.get("kafka.input.topics").split(",");
        long ckInterval = parameterTool.getLong("checkpoint.interval");
        String ckPath = parameterTool.getRequired("checkpoint.path");

        environment.enableCheckpointing(ckInterval)
                .setStateBackend(new FsStateBackend(ckPath));

//        Properties properties = new Properties();
        Properties properties = parameterTool.getProperties();

        FlinkKafkaConsumer<T> flinkKafkaConsumer = new FlinkKafkaConsumer<>(Arrays.asList(topics),
                clazz.newInstance(), properties);
        flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);

        return environment.addSource(flinkKafkaConsumer);
    }
}
